/*
 * Copyright (c) 2017, MegaEase
 * All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package grpcserver

import (
	"fmt"
	"reflect"
	"sync/atomic"
	"time"

	"github.com/megaease/easegress/pkg/context"
	"github.com/megaease/easegress/pkg/filters/proxies/grpcproxy"
	"github.com/megaease/easegress/pkg/graceupdate"
	"github.com/megaease/easegress/pkg/logger"
	"github.com/megaease/easegress/pkg/supervisor"
	"github.com/megaease/easegress/pkg/util/limitlistener"
	"google.golang.org/grpc"
	"google.golang.org/grpc/keepalive"
)

const (
	stateNil     stateType = "nil"
	stateFailed  stateType = "failed"
	stateRunning stateType = "running"
	stateClosed  stateType = "closed"

	checkFailedTimeout = 10 * time.Second
)

var (
	errNil = fmt.Errorf("")
	gnet   = graceupdate.Global
)

type (
	stateType string

	eventCheckFailed struct{}
	eventServeFailed struct {
		roundNum uint64
		err      error
	}
	eventReload struct {
		nextSuperSpec *supervisor.Spec
		muxMapper     context.MuxMapper
	}
	eventClose struct{ done chan struct{} }

	runtime struct {
		superSpec *supervisor.Spec
		spec      *Spec
		s         *grpc.Server
		mux       *mux
		roundNum  uint64
		eventChan chan interface{}

		// status
		state atomic.Value // stateType
		err   atomic.Value // error

		limitListener *limitlistener.LimitListener
	}
	// Status contains all status generated by runtime, for displaying to users.
	Status struct {
		Health bool      `json:"health"`
		State  stateType `json:"state"`
		Error  string    `json:"error,omitempty"`
	}
)

func newRuntime(superSpec *supervisor.Spec, muxMapper context.MuxMapper) *runtime {
	r := &runtime{
		superSpec: superSpec,
		mux:       newMux(muxMapper),
		eventChan: make(chan interface{}, 10),
	}
	r.setError(errNil)
	r.setState(stateNil)
	go r.fsm()
	go r.checkFailed(checkFailedTimeout)
	return r
}

// Close closes runtime.
func (r *runtime) Close() {
	done := make(chan struct{})
	r.eventChan <- &eventClose{done: done}
	<-done
}

// Status is the wrapper of runtime's Status.
func (r *runtime) Status() *Status {
	err := r.getError()
	return &Status{
		Health: err.Error() == errNil.Error(),
		Error:  err.Error(),
		State:  r.getState(),
	}
}

// FSM is the finite-state-machine for the runtime.
func (r *runtime) fsm() {
	for e := range r.eventChan {
		switch e := e.(type) {
		case *eventCheckFailed:
			r.handleEventCheckFailed(e)
		case *eventServeFailed:
			r.handleEventServeFailed(e)
		case *eventReload:
			r.handleEventReload(e)
		case *eventClose:
			r.handleEventClose(e)
			// NOTE: We don't close hs.eventChan,
			// in case of panic of any other goroutines
			// to send event to it later.
			return
		default:
			logger.Errorf("BUG: unknown event: %T\n", e)
		}
	}
}

func (r *runtime) reload(nextSuperSpec *supervisor.Spec, muxMapper context.MuxMapper) {
	r.superSpec = nextSuperSpec
	r.mux.reload(nextSuperSpec, muxMapper)

	nextSpec := nextSuperSpec.ObjectSpec().(*Spec)

	// r.limitListener is not created just after the process started and the config load for the first time.
	if nextSpec != nil && r.limitListener != nil {
		r.limitListener.SetMaxConnection(nextSpec.MaxConnections)
	}

	// NOTE: Due to the mechanism of supervisor,
	// nextSpec must not be nil, just defensive programming here.
	switch {
	case r.spec == nil && nextSpec == nil:
		logger.Errorf("BUG: nextSpec is nil")
		// Nothing to do.
	case r.spec == nil && nextSpec != nil:
		r.spec = nextSpec
		r.startServer()
	case r.spec != nil && nextSpec == nil:
		logger.Errorf("BUG: nextSpec is nil")
		r.spec = nil
		r.closeServer()
	case r.spec != nil && nextSpec != nil:
		if r.needRestartServer(nextSpec) {
			r.spec = nextSpec
			r.closeServer()
			r.startServer()
		} else {
			r.spec = nextSpec
		}
	}
}

func (r *runtime) setState(state stateType) {
	r.state.Store(state)
}

func (r *runtime) getState() stateType {
	return r.state.Load().(stateType)
}

func (r *runtime) setError(err error) {
	if err == nil {
		r.err.Store(errNil)
	} else {
		// NOTE: For type safe.
		r.err.Store(fmt.Errorf("%v", err))
	}
}

func (r *runtime) getError() error {
	err := r.err.Load()
	if err == nil {
		return errNil
	}
	return err.(error)
}

func (r *runtime) needRestartServer(nextSpec *Spec) bool {
	x := *r.spec
	y := *nextSpec

	// The change of options below need not restart the HTTP server.
	x.MaxConnections, y.MaxConnections = 0, 0
	x.CacheSize, y.CacheSize = 0, 0
	x.XForwardedFor, y.XForwardedFor = false, false
	x.IPFilter, y.IPFilter = nil, nil
	x.Rules, y.Rules = nil, nil

	return !reflect.DeepEqual(x, y)
}

func (r *runtime) startServer() {
	curRound := atomic.AddUint64(&r.roundNum, 1)
	r.setError(errNil)
	r.setState(stateRunning)
	listen, err := gnet.Listen("tcp", fmt.Sprintf(":%d", r.spec.Port))
	if err != nil {
		r.setState(stateFailed)
		r.setError(err)
		return
	}
	opts := []grpc.ServerOption{grpc.UnknownServiceHandler(r.mux.handler), grpc.CustomCodec(&grpcproxy.GrpcCodec{})}
	keepaliveOpts := r.buildServerKeepaliveOpt()

	if len(keepaliveOpts) != 0 {
		opts = append(opts, keepaliveOpts...)
	}

	limitListener := limitlistener.NewLimitListener(listen, uint32(r.spec.MaxConnections))
	r.limitListener = limitListener

	r.s = grpc.NewServer(opts...)
	// avoid data race
	srv := r.s
	go func() {
		err := srv.Serve(limitListener)
		if err != nil {
			r.eventChan <- &eventServeFailed{
				err:      err,
				roundNum: curRound,
			}
		}
	}()
}

func (r *runtime) buildServerKeepaliveOpt() []grpc.ServerOption {
	var opts []grpc.ServerOption
	enforceParam := keepalive.EnforcementPolicy{
		PermitWithoutStream: r.spec.PermitWithoutStream,
	}

	if r.spec.MinTime != "" {
		enforceParam.MinTime, _ = time.ParseDuration(r.spec.MinTime)
	}
	opts = append(opts, grpc.KeepaliveEnforcementPolicy(enforceParam))

	keepaliveParam := keepalive.ServerParameters{}
	if r.spec.MaxConnectionAge != "" {
		keepaliveParam.MaxConnectionAge, _ = time.ParseDuration(r.spec.MaxConnectionAge)
	}

	if r.spec.MaxConnectionAgeGrace != "" {
		keepaliveParam.MaxConnectionAgeGrace, _ = time.ParseDuration(r.spec.MaxConnectionAgeGrace)
	}

	if r.spec.Time != "" {
		keepaliveParam.Time, _ = time.ParseDuration(r.spec.Time)
	}

	if r.spec.Timeout != "" {
		keepaliveParam.Timeout, _ = time.ParseDuration(r.spec.Timeout)
	}

	if r.spec.MaxConnectionIdle != "" {
		keepaliveParam.MaxConnectionIdle, _ = time.ParseDuration(r.spec.MaxConnectionIdle)
	}

	opts = append(opts, grpc.KeepaliveParams(keepaliveParam))
	return opts
}

func (r *runtime) closeServer() {
	if r.s != nil {
		r.s.GracefulStop()
	}
}

func (r *runtime) checkFailed(timeout time.Duration) {
	ticker := time.NewTicker(timeout)
	for range ticker.C {
		state := r.getState()
		if state == stateFailed {
			r.eventChan <- &eventCheckFailed{}
		} else if state == stateClosed {
			ticker.Stop()
			return
		}
	}
}

func (r *runtime) handleEventCheckFailed(e *eventCheckFailed) {
	if r.getState() == stateFailed {
		r.startServer()
	}
}

func (r *runtime) handleEventServeFailed(e *eventServeFailed) {
	if atomic.LoadUint64(&r.roundNum) > e.roundNum {
		return
	}
	r.setState(stateFailed)
	r.setError(e.err)
}

func (r *runtime) handleEventReload(e *eventReload) {
	r.reload(e.nextSuperSpec, e.muxMapper)
}

func (r *runtime) handleEventClose(e *eventClose) {
	r.setState(stateClosed)
	r.setError(errNil)
	r.closeServer()
	r.mux.close()
	close(e.done)
}
